24 实践课-基于 LLM 的意图识别模型开发

基于 LLM 的意图识别模型开发

关联:索引

术语小抄(初学者版)

python -m pip install -U sentence-transformers numpy

解释与自检:

环境兼容提醒(若你同时使用《机器人与智能系统开发技术》的 YOLOv8 + ROS2 视觉节点环境,本节为跨课程提醒):

python -m pip install -U "numpy==1.26.4" sentence-transformers

解释与自检:


1)分拣场景意图分类体系(示例口径)

说明:

2)意图体系的三条设计原则(写进标注规范)

  1. 互斥优先:同一条样本尽量只属于 1 个意图;如果确实复合,明确“主意图优先级”或拆成多轮交互(先澄清再执行)。
  2. 覆盖优先:必须设计 unknown/clarify,并写清触发条件(缺槽位/冲突/风险动作)。
  3. 可扩展优先:新增意图时不破坏旧数据:通过版本化(guideline_version/dataset_version)保持可追溯。

每行一个 JSON 对象(JSONL),字段口径如下:

{"sample_id":"S-0001","raw_text":"把红色苹果分到A通道","intent":"sort_by_color","slots":{"color":"red","target_lane":"A"},"scene":"sorting","guideline_version":"v0.1","ts_ms":1710000000000}
{"sample_id":"S-0002","raw_text":"把小果放到2号口","intent":"sort_by_size","slots":{"size":"small","target_lane":"2"},"scene":"sorting","guideline_version":"v0.1","ts_ms":1710000000500}
{"sample_id":"S-0003","raw_text":"这批有明显瑕疵的全部剔除","intent":"sort_by_defect","slots":{"defect_level":"severe","action":"reject"},"scene":"sorting","guideline_version":"v0.1","ts_ms":1710000000900}
{"sample_id":"S-0004","raw_text":"现在分拣到哪一步了?","intent":"query_sorting_status","slots":{},"scene":"sorting","guideline_version":"v0.1","ts_ms":1710000001200}
{"sample_id":"S-0004b","raw_text":"把A级苹果送到1号口","intent":"sort_by_grade","slots":{"grade":"A","target_lane":"1","source_topic":"/sorting/perception/apple_quality"},"scene":"sorting","guideline_version":"v0.1","ts_ms":1710000001250}
{"sample_id":"S-0004c","raw_text":"最近10秒相机识别的A/B/C各有多少?","intent":"query_apple_quality","slots":{"window_s":10,"metric":"grade_count","source_topic":"/sorting/perception/apple_quality"},"scene":"sorting","guideline_version":"v0.1","ts_ms":1710000001300}
{"sample_id":"S-0005","raw_text":"快停下!","intent":"device_control","slots":{"action":"stop"},"scene":"sorting","guideline_version":"v0.1","ts_ms":1710000001500}
{"sample_id":"S-0006","raw_text":"按它分一下","intent":"clarify","slots":{"missing":["by_color/by_size/by_defect"]},"scene":"sorting","guideline_version":"v0.1","ts_ms":1710000001800}
{"sample_id":"S-0006b","raw_text":"把视觉检测置信度阈值调到0.6","intent":"tune_vision_params","slots":{"conf_thres":0.6},"scene":"sorting","guideline_version":"v0.1","ts_ms":1710000001900}

解释与自检:

4)与 ROS2 视觉检测节点(YOLOv8)对齐:把“可用业务指标”纳入意图与槽位

说明(避免课程混淆):

来自《机器人与智能系统开发技术》《——视觉检测节点开发(YOLOv8 集成)》的可复用事实(跨课程引用;做意图体系设计时必须对齐这些字段,否则训练出来也接不进系统):

为什么要把它写进意图体系:

建议的“视觉联动槽位”口径(可直接写进你们的标注规范):

命令级取证(可选,但推荐在做“视觉联动”时留证据;Ubuntu 22.04 + ROS2 Humble 环境):

ros2 topic info /sorting/perception/apple_quality -v
ros2 topic echo /sorting/perception/apple_quality --once

解释与自检:


AI 工具使用:意图体系设计 / 数据清洗建议 / 训练代码生成 / 报错排查(学生可直接复制)

使用方法:把你们的“场景描述、意图列表草案、数据样例、报错日志”粘贴到 {你的内容}。要求 AI 输出结构化结果(表格/清单/步骤),便于你人工审计与落地实现。

模板目录:

模板 1:生成意图分类体系(含边界与例句)

你是工业分拣场景的意图识别设计师。请为我的场景设计“意图分类体系”,要求:
1)至少 5 类意图,必须包含 clarify 与 unknown;
2)每个意图写:定义、边界(不属于它的情况)、至少 5 条例句;
3)给出冲突处理规则(例如同时出现颜色与大小时怎么办);
4)必须覆盖“视觉检测联动”意图:sort_by_grade/query_apple_quality/tune_vision_params,并说明它们与 ROS2 视觉话题的关系(/sorting/perception/apple_quality)。
5)输出为 Markdown 表格。

我的场景与约束:{你的内容}

模板 2:数据清洗与标注一致性检查建议

你是数据标注质检员。下面是我标注数据的字段说明与 20 条样例(JSONL 或 CSV)。请你输出:
1)<<<ISSUES>>>:问题清单(缺字段/intent不在集合/重复样本/冲突标注/slots不一致/明显噪声)
2)<<<FIXES>>>:清洗与规范化建议(可执行步骤),并给“修复后字段口径”
3)<<<CHECKLIST>>>:我提交作业前的自检清单(至少 12 条)
输出要求:三段格式必须包含 ISSUES/FIXES/CHECKLIST 标记。

我的数据:{你的内容}

模板 3:生成训练代码(Embedding + 轻量分类器)

你是 Python 实践课助教。请基于 sentence-transformers + numpy 生成一个“意图识别训练脚本”,要求:
1)读取 JSONL 数据(字段:sample_id/raw_text/intent/slots/scene/guideline_version/ts_ms,可选 vision);
2)把 intent 映射为 label id;
3)用 sentence-transformers 把 raw_text 编码为向量;
4)用 numpy 实现一个最小可跑的多分类 softmax 线性分类器训练(含 train/val 切分、loss、accuracy、macro-F1、confusion matrix);
5)若数据里包含 `vision`(grade/diameter_mm/red_ratio/defect_score),支持两种方式任选其一:文本增强(拼 token)或特征拼接(embedding 后 concat 数值特征);
6)输出训练日志(每轮:loss/acc/f1),并保存一个 model.json(包含 label_map、权重矩阵、bias、model_version、dataset_version;若用了 vision,需记录特征规格);
7)不得引入第三方训练框架(不能用 sklearn/torch)。

我的意图集合与数据路径:{你的内容}

模板 4:报错排查与调参建议(定位→修复→预防)

下面是我训练意图识别模型的报错与日志片段:{你的内容}
请按“定位→修复→预防”输出:
1)先判断属于哪类问题:数据格式/标签映射/依赖安装/内存不足/维度不匹配/学习率不稳定;
2)给最多 8 步排查,每步写清我该检查什么、预期结果是什么;
3)给 5 条调参建议(学习率、轮数、batch、阈值、类别平衡),并说明为什么;
4)最后给一份“回归验证清单”(至少 8 条)确保修复有效且不引入回归。

  1. 为什么同一句话“把红苹果分出来”有时会导致不同同学标成不同意图?差异来自哪里?
  2. 如果没有 clarify/unknown,模型遇到不确定指令会怎么做?在工业场景会带来什么风险?
  1. 定义意图集合(Taxonomy)与边界规则(Guideline)。
  2. 采集与脱敏数据(raw_text),形成样本池。
  3. 按规范标注 intent 与 slots,做一致性检查与清洗。
  4. 划分 train/val/test(至少要有 val,用于评估与调参)。
  5. 训练基线模型并评估(先跑通,再优化)。
  6. 迭代:补数据/改规范/平衡类别/增加 unknown 阈值与澄清策略。

解释与自检:

1)写清“分类依据”(避免同学各自理解)

建议每个意图至少写 3 类信息:

2)冲突规则(分拣场景示例)

常见冲突:同一句话同时提到颜色与大小:

1)为什么要用 JSONL

2)数据自检脚本(只用标准库)

说明(与项目实现对齐):

from __future__ import annotations

"""
check_intent_data.py

用途:
- 对 intent_data.jsonl 做“结构化自检”,避免训练前因为数据格式问题导致训练脚本报错或指标异常。
- 输出样本总数、问题清单(最多 30 条)以及意图分布统计,便于快速发现:
  - 必填字段缺失/类型不对
  - sample_id 重复
  - intent 不在 intents.json 允许集合里

运行方式:
python check_intent_data.py <data.jsonl> <intents.json>
"""

import json
import sys
from collections import Counter
from pathlib import Path
from typing import Dict, List, Set, Tuple

# jsonl 每一行必须包含的字段(训练脚本会依赖其中的 sample_id/raw_text/intent)
REQUIRED_FIELDS = {"sample_id", "raw_text", "intent", "slots", "scene", "guideline_version", "ts_ms"}

def load_jsonl(path: Path) -> List[Dict]:
    """
    读取 jsonl(JSON Lines)文件:一行一个 JSON 对象。

    - 会忽略空行
    - 任意一行 JSON 解析失败就立即抛出异常,并标注行号,便于定位
    """
    rows: List[Dict] = []
    for i, line in enumerate(path.read_text(encoding="utf-8").splitlines(), start=1):
        ln = line.strip()
        if not ln:
            continue
        try:
            obj = json.loads(ln)
        except json.JSONDecodeError as e:
            raise RuntimeError(f"JSON decode error at line {i}: {e}") from e
        rows.append(obj)
    return rows

def check_rows(rows: List[Dict], intents: Set[str]) -> Tuple[List[str], Counter]:
    """
    对每条数据做规则检查,返回:
    - issues:问题列表(字符串)
    - intent_counter:意图分布计数器
    """
    issues: List[str] = []
    seen_ids: Set[str] = set()
    intent_counter: Counter = Counter()

    for idx, r in enumerate(rows, start=1):
        # 1) 字段完整性检查:缺字段直接记录并跳过该行后续检查(避免 KeyError 连锁报错)
        missing = REQUIRED_FIELDS - set(r.keys())
        if missing:
            issues.append(f"row#{idx}: missing fields: {sorted(missing)}")
            continue

        # 2) sample_id:用于追溯、去重、以及将结构化字段回填到样本(train_intent_model.py 会按 id 取 row)
        sid = r["sample_id"]
        if not isinstance(sid, str) or not sid.strip():
            issues.append(f"row#{idx}: invalid sample_id")
        elif sid in seen_ids:
            issues.append(f"row#{idx}: duplicate sample_id={sid}")
        else:
            seen_ids.add(sid)

        # 3) raw_text:文本特征的唯一来源(Embedding 会直接对它编码)
        text = r["raw_text"]
        if not isinstance(text, str) or not text.strip():
            issues.append(f"row#{idx}: invalid raw_text")

        # 4) intent:必须落在 intents.json 的允许集合中,否则训练/评估标签会不一致
        intent = r["intent"]
        if intent not in intents:
            issues.append(f"row#{idx}: intent not in set: {intent}")
        else:
            intent_counter[intent] += 1

        # 5) slots:目前训练脚本不直接使用 slots,但要求其为 dict,保证标注结构统一、便于后续扩展
        slots = r["slots"]
        if not isinstance(slots, dict):
            issues.append(f"row#{idx}: slots must be object/dict")

    return issues, intent_counter

def main() -> None:
    if len(sys.argv) < 3:
        print("usage: python check_intent_data.py <data.jsonl> <intents.json>")
        sys.exit(2)

    data_path = Path(sys.argv[1])
    intents_path = Path(sys.argv[2])

    # intents.json 约定为“intent 名称数组”,例如:["sort_by_color", "sort_by_size", ...]
    intents = set(json.loads(intents_path.read_text(encoding="utf-8")))
    rows = load_jsonl(data_path)
    issues, counter = check_rows(rows, intents)

    # 第一行 JSON 便于程序化读取;后续的人类可读输出用于课堂快速排查
    print(json.dumps({"total": len(rows), "issues_count": len(issues)}, ensure_ascii=False))
    for it in issues[:30]:
        print(f"- {it}")
    print(json.dumps({"intent_distribution": dict(counter)}, ensure_ascii=False, indent=2))

    # 约定:只要存在问题就返回非 0 退出码,方便在脚本/CI 中联动使用
    if issues:
        sys.exit(1)

if __name__ == "__main__":
    main()

逐段解释与自检要点:

运行示例(PowerShell):

python .\check_intent_data.py .\intent_data.jsonl .\intents.json

解释与自检:

五、练习(至少完成 2 题)

1)为你们自选题场景设计意图体系(至少 5 类),并写出每类的“定义 + 边界 + 5 条例句”。

2)用 JSONL 格式整理 60 条标注数据(每类至少 8 条),并运行数据自检脚本。

  1. 为什么仅看 accuracy 不够?当类别不平衡时会发生什么错觉?
  2. 如果模型输出一个意图,但置信度很低,你应该执行还是澄清?阈值怎么定?

本讲采用“Embedding + 轻量分类器”的路径:

  1. 使用语言模型(Embedding 模型)把 raw_text 编码为向量。
  2. 在向量上训练一个小的多分类器(softmax 线性层)。
  3. 用验证集评估,并把模型参数保存为可复验文件。

视觉联动提醒(跨课程引用,避免课程混淆):

阈值与 unknown(可用性关键点):

四、代码模板:训练与评估脚本(Embedding + numpy 训练)

说明(与项目实现对齐):

把下面脚本保存为 train_intent_model.py(或直接对照项目脚本)。它会:

from __future__ import annotations

"""
train_intent_model.py

核心思路:
1) 使用预训练文本编码模型(SentenceTransformer)把 raw_text 编码成语义向量(Embedding)。
2) 在向量上训练一个轻量 Softmax 线性分类器(多分类逻辑回归/线性层)。
3) 若数据中存在 vision 字段,则自动把“视觉特征 7 维”拼接到文本 embedding 后面一起训练。
"""

import json
import math
import sys
import time
from dataclasses import dataclass
from pathlib import Path
from typing import Any, Dict, List, Tuple

import numpy as np
from sentence_transformers import SentenceTransformer

@dataclass(frozen=True)
class Sample:
    sample_id: str
    raw_text: str
    intent: str

def load_jsonl(path: Path) -> List[Dict]:
    rows: List[Dict] = []
    for i, line in enumerate(path.read_text(encoding="utf-8").splitlines(), start=1):
        ln = line.strip()
        if not ln:
            continue
        try:
            rows.append(json.loads(ln))
        except json.JSONDecodeError as e:
            raise RuntimeError(f"JSON decode error at line {i}: {e}") from e
    return rows

def make_label_map(intents: List[str]) -> Dict[str, int]:
    return {name: i for i, name in enumerate(intents)}

def train_val_split(samples: List[Sample], *, val_ratio: float, seed: int) -> Tuple[List[Sample], List[Sample]]:
    rng = np.random.default_rng(seed)
    by_intent: Dict[str, List[Sample]] = {}
    for s in samples:
        by_intent.setdefault(s.intent, []).append(s)

    train: List[Sample] = []
    val: List[Sample] = []
    for _, group in by_intent.items():
        idx = np.arange(len(group))
        rng.shuffle(idx)

        if len(group) <= 2:
            n_val = 0
        else:
            n_val = int(math.floor(len(group) * float(val_ratio)))
            n_val = max(1, min(n_val, len(group) - 1))

        val.extend([group[i] for i in idx[:n_val].tolist()])
        train.extend([group[i] for i in idx[n_val:].tolist()])

    rng.shuffle(train)
    rng.shuffle(val)
    return train, val

def softmax(logits: np.ndarray) -> np.ndarray:
    x = logits - logits.max(axis=1, keepdims=True)
    e = np.exp(x)
    return e / e.sum(axis=1, keepdims=True)

def one_hot(y: np.ndarray, num_classes: int) -> np.ndarray:
    out = np.zeros((y.shape[0], num_classes), dtype=np.float32)
    out[np.arange(y.shape[0]), y] = 1.0
    return out

def macro_f1(y_true: np.ndarray, y_pred: np.ndarray, num_classes: int) -> float:
    f1s: List[float] = []
    for c in range(num_classes):
        tp = int(((y_true == c) & (y_pred == c)).sum())
        fp = int(((y_true != c) & (y_pred == c)).sum())
        fn = int(((y_true == c) & (y_pred != c)).sum())
        if tp == 0 and (fp > 0 or fn > 0):
            f1s.append(0.0)
            continue
        if tp == 0 and fp == 0 and fn == 0:
            f1s.append(0.0)
            continue
        prec = tp / (tp + fp) if (tp + fp) else 0.0
        rec = tp / (tp + fn) if (tp + fn) else 0.0
        f1 = (2 * prec * rec / (prec + rec)) if (prec + rec) else 0.0
        f1s.append(float(f1))
    return float(sum(f1s) / len(f1s)) if f1s else 0.0

def confusion_matrix(y_true: np.ndarray, y_pred: np.ndarray, num_classes: int) -> np.ndarray:
    cm = np.zeros((num_classes, num_classes), dtype=np.int64)
    for t, p in zip(y_true.tolist(), y_pred.tolist()):
        cm[int(t), int(p)] += 1
    return cm

def embed_texts(model: SentenceTransformer, texts: List[str]) -> np.ndarray:
    vecs = model.encode(
        texts,
        batch_size=32,
        show_progress_bar=True,
        convert_to_numpy=True,
        normalize_embeddings=True,
    )
    return np.ascontiguousarray(vecs.astype(np.float32))

def vision_features(row: Dict[str, Any]) -> np.ndarray:
    v = row.get("vision")
    if not isinstance(v, dict):
        return np.zeros((1, 7), dtype=np.float32)

    grade = str(v.get("grade", "")).strip().upper()
    grade_a = 1.0 if grade == "A" else 0.0
    grade_b = 1.0 if grade == "B" else 0.0
    grade_c = 1.0 if grade == "C" else 0.0

    diameter_mm = float(v.get("diameter_mm", 0.0))
    red_ratio = float(v.get("red_ratio", 0.0))
    defect_score = float(v.get("defect_score", 0.0))

    diameter_norm = max(0.0, min(1.0, diameter_mm / 100.0))
    red_ratio_norm = max(0.0, min(1.0, red_ratio))
    defect_norm = max(0.0, min(1.0, defect_score))

    has_vision = 1.0
    feats = np.array(
        [ [diameter_norm, red_ratio_norm, defect_norm, grade_a, grade_b, grade_c, has_vision] ],
        dtype=np.float32,
    )
    return feats

def concat_features(text_vecs: np.ndarray, feats_list: List[np.ndarray]) -> np.ndarray:
    feats = np.concatenate(feats_list, axis=0)
    return np.concatenate([text_vecs, feats], axis=1)

def train_softmax(
    x_train: np.ndarray,
    y_train: np.ndarray,
    x_val: np.ndarray,
    y_val: np.ndarray,
    num_classes: int,
    *,
    lr: float,
    epochs: int,
    weight_decay: float,
    seed: int,
) -> Tuple[np.ndarray, np.ndarray]:
    rng = np.random.default_rng(seed)
    n, d = x_train.shape

    w = (rng.standard_normal((d, num_classes)).astype(np.float32)) * 0.01
    b = np.zeros((1, num_classes), dtype=np.float32)

    y_train_oh = one_hot(y_train, num_classes)
    class_counts = np.bincount(y_train, minlength=num_classes).astype(np.float32)
    class_counts = np.maximum(class_counts, 1.0)
    class_weights = (float(y_train.shape[0]) / (float(num_classes) * class_counts)).astype(np.float32)
    sample_weights = class_weights[y_train].astype(np.float32).reshape(-1, 1)
    weight_denom = float(sample_weights.sum())

    for ep in range(1, epochs + 1):
        logits = x_train @ w + b
        probs = softmax(logits)

        eps = 1e-12
        per_row = np.sum(y_train_oh * np.log(probs + eps), axis=1, keepdims=True)
        loss = -float((sample_weights * per_row).sum() / weight_denom)
        loss += 0.5 * weight_decay * float((w * w).sum())

        grad_logits = ((probs - y_train_oh) * sample_weights) / weight_denom
        grad_w = x_train.T @ grad_logits + (weight_decay * w)
        grad_b = grad_logits.sum(axis=0, keepdims=True)

        w -= lr * grad_w
        b -= lr * grad_b

        if ep == 1 or ep % 2 == 0 or ep == epochs:
            val_probs = softmax(x_val @ w + b)
            y_pred = val_probs.argmax(axis=1)
            acc = float((y_pred == y_val).mean())
            f1 = macro_f1(y_val, y_pred, num_classes)
            print(json.dumps({"epoch": ep, "loss": float(loss), "val_acc": acc, "val_macro_f1": f1}, ensure_ascii=False))

    return w, b

def main() -> None:
    if len(sys.argv) < 5:
        print("usage: python train_intent_model.py <data.jsonl> <intents.json> <dataset_version> <model_version>")
        sys.exit(2)

    data_path = Path(sys.argv[1])
    intents_path = Path(sys.argv[2])
    dataset_version = sys.argv[3]
    model_version = sys.argv[4]

    intents = json.loads(intents_path.read_text(encoding="utf-8"))
    label_map = make_label_map(intents)

    rows = load_jsonl(data_path)
    row_by_id: Dict[str, Dict[str, Any]] = {}
    for r in rows:
        sid = str(r.get("sample_id", "")).strip()
        if sid:
            row_by_id[sid] = r

    samples: List[Sample] = []
    for r in rows:
        intent = str(r.get("intent", "")).strip()
        if intent not in label_map:
            continue
        sample_id = str(r.get("sample_id", "")).strip()
        raw_text = str(r.get("raw_text", "")).strip()
        if not sample_id or not raw_text:
            continue
        samples.append(Sample(sample_id=sample_id, raw_text=raw_text, intent=intent))

    if len(samples) < 30:
        raise RuntimeError("too few samples: need at least 30 valid samples for a meaningful split")

    train_samples, val_samples = train_val_split(samples, val_ratio=0.2, seed=42)

    st_model_name = "sentence-transformers/paraphrase-multilingual-MiniLM-L12-v2"
    st = SentenceTransformer(st_model_name)

    x_train_text = embed_texts(st, [s.raw_text for s in train_samples])
    x_val_text = embed_texts(st, [s.raw_text for s in val_samples])

    train_rows = [row_by_id.get(s.sample_id, {}) for s in train_samples]
    val_rows = [row_by_id.get(s.sample_id, {}) for s in val_samples]

    use_vision = any(isinstance(r.get("vision"), dict) for r in rows)
    if use_vision:
        x_train = concat_features(x_train_text, [vision_features(r) for r in train_rows])
        x_val = concat_features(x_val_text, [vision_features(r) for r in val_rows])
    else:
        x_train = x_train_text
        x_val = x_val_text

    y_train = np.array([label_map[s.intent] for s in train_samples], dtype=np.int64)
    y_val = np.array([label_map[s.intent] for s in val_samples], dtype=np.int64)

    w, b = train_softmax(
        x_train,
        y_train,
        x_val,
        y_val,
        num_classes=len(intents),
        lr=0.3,
        epochs=30,
        weight_decay=1e-3,
        seed=7,
    )

    val_probs = softmax(x_val @ w + b)
    y_pred = val_probs.argmax(axis=1)
    acc = float((y_pred == y_val).mean())
    f1 = macro_f1(y_val, y_pred, len(intents))
    cm = confusion_matrix(y_val, y_pred, len(intents))

    print(json.dumps({"final_val_acc": acc, "final_val_macro_f1": f1}, ensure_ascii=False))
    print("confusion_matrix(rows=true, cols=pred):")
    print(cm)

    out = {
        "model_version": model_version,
        "dataset_version": dataset_version,
        "created_ts_ms": int(time.time() * 1000),
        "embedding_model": st_model_name,
        "label_map": label_map,
        "weights": w.tolist(),
        "bias": b.reshape(-1).tolist(),
    }
    if use_vision:
        out["vision_feature_spec"] = [
            "diameter_norm_0_1",
            "red_ratio_norm_0_1",
            "defect_norm_0_1",
            "grade_A",
            "grade_B",
            "grade_C",
            "has_vision",
        ]
    Path("model.json").write_text(json.dumps(out, ensure_ascii=False, indent=2), encoding="utf-8")
    print("saved: model.json")

if __name__ == "__main__":
    main()

逐段解释与自检要点:

运行示例(PowerShell):

python .\train_intent_model.py .\intent_data.jsonl .\intents.json v0.1 v0.4

解释与自检:

补充:用 model.json 做推理预测(项目提供 predict_intent.py)

训练脚本会把分类器权重与 label_map 保存到 model.json。推理时你只需要加载这个文件并输入一句话,就能得到:

纯文本推理示例(PowerShell):

python .\predict_intent.py .\model.json "启动输送线"

若模型训练时启用了 vision_feature_spec(意味着训练数据中出现过 vision 字段),你可以选择传入视觉上下文:

python .\predict_intent.py .\model.json "把A级苹果送到1号口" --vision-json "{\"grade\":\"A\",\"diameter_mm\":82.1,\"red_ratio\":0.72,\"defect_score\":0.04}"

解释与自检:

五、把模型接回智能体闭环(与 07 对齐,最小映射)

训练后你拿到的是:intent -> label_id 与一组权重。工程上你需要把“意图”映射为“工具/流程”:

intent(sort_by_color)  -> tool(sort_color)
intent(sort_by_size)   -> tool(sort_size)
intent(sort_by_defect) -> tool(sort_defect)
intent(query_sorting_status) -> tool(query_status)
intent(device_control) -> tool(send_device_command) + 安全门禁 + 二次确认
intent(clarify/unknown) -> 生成澄清问题,不执行

自检要点:

六(加分扩展):拼接视觉上下文,让“意图识别”真正消费 YOLOv8 的分拣指标(跨课程引用)

适用场景:

1)文本增强(最简单):

把A级苹果送到1号口 [GRADE=A] [DIAMETER_MM=78.2] [RED_RATIO=0.62] [DEFECT_SCORE=0.05]

优点:实现快,改动小;缺点:数值区间与离散化需要你写规范,否则标注不一致。

2)特征拼接(更工程化):

1)标注数据扩展字段(建议)

在原 JSONL 基础上,允许每条样本可选包含 vision 字段:

{"sample_id":"S-0101","raw_text":"把A级苹果送到1号口","intent":"sort_by_grade","slots":{"grade":"A","target_lane":"1"},"vision":{"grade":"A","diameter_mm":78.2,"red_ratio":0.62,"defect_score":0.05},"scene":"sorting","guideline_version":"v0.1","ts_ms":1710000010000}

解释与自检:

2)取证:从 ROS2 视觉话题拿到可标注的事实字段(Ubuntu + ROS2 环境)

ros2 topic echo /sorting/perception/apple_quality --once

解释与自检:

3)训练代码改造(特征拼接版,核心片段)

目标:在不引入新框架的前提下,把 vision 数值特征拼到文本 embedding 后面。

说明:

import numpy as np
from typing import Any, Dict, List

def vision_features(row: Dict[str, Any]) -> np.ndarray:
    v = row.get("vision")
    if not isinstance(v, dict):
        return np.zeros((1, 7), dtype=np.float32)

    grade = str(v.get("grade", "")).strip().upper()
    grade_a = 1.0 if grade == "A" else 0.0
    grade_b = 1.0 if grade == "B" else 0.0
    grade_c = 1.0 if grade == "C" else 0.0

    diameter_mm = float(v.get("diameter_mm", 0.0))
    red_ratio = float(v.get("red_ratio", 0.0))
    defect_score = float(v.get("defect_score", 0.0))

    diameter_norm = max(0.0, min(1.0, diameter_mm / 100.0))
    red_ratio_norm = max(0.0, min(1.0, red_ratio))
    defect_norm = max(0.0, min(1.0, defect_score))

    has_vision = 1.0
    feats = np.array(
        [ [diameter_norm, red_ratio_norm, defect_norm, grade_a, grade_b, grade_c, has_vision] ],
        dtype=np.float32,
    )
    return feats

def concat_features(text_vecs: np.ndarray, feats_list: List[np.ndarray]) -> np.ndarray:
    feats = np.concatenate(feats_list, axis=0)
    return np.concatenate([text_vecs, feats], axis=1)

逐段解释与自检要点:

main 函数里的最小替换点(只说明改哪里,不重复整段代码):

七、练习(至少完成 2 题)

1)用你们的自选题数据跑通训练脚本,记录 final_val_accfinal_val_macro_f1,并截图训练日志(至少 3 条 epoch 输出)。

2)做一次“数据驱动优化”:针对混淆最严重的 2 个意图类别,各补 10 条样本并重新训练,比较 macro-F1 是否提升。

课程思政(融入点:攻坚克难的探索精神)

  1. 你们遇到的“最难标/最容易混淆”的两类意图是什么?为什么?你准备用什么数据策略解决?
  2. 如果模型误判导致错误执行,你如何通过“意图体系 + 数据版本 + 训练日志”追溯并修复?

作业:布置

1)提交自选题场景的意图分类体系文档(含分类依据、各类别说明)。

2)提交模型训练代码(AI 生成 + 人工优化版)、训练日志截图,附 150 字左右说明,记录模型初始准确率及问题。

3)提交标注数据清洗与格式规范化后的文件。

Markdown 自检清单(提交前自己勾一遍)